golang 的 chan select 實在太方便,其實任何提供了協程的語言都能很好且方便的支持 chan 和 select,因爲經常寫 typescript 腳本,於是我把這兩個組件實現到了一個 typescript,你可以直接使用我的庫來得到 chan 和 select,本文後續是實現代碼的分析,你也可以參照分析去任何支持協程的語言中把golang的特性發揚光大
在上篇文章 我們已經有了一個 chan,現在來實現 case 和 select
golang 的 select 實際上接收了一個 case 的數組只是golang提供了語法糖讓它來更好書寫,我們作爲第三方庫就只能定義一個接收 case 數組的函數了
export function selectChan(...cases: Array<CaseLike | undefined>): Promise<CaseLike> | CaseLike | undefined ;
爲了讓 typescript 的提示更智能於是還添加了下面幾個簽名,(不叫 select 而叫 selectChan 是因爲 select 名字太特殊了我想使用一個儘量不會與其它庫重名的函數名 這樣大量使用時才比較方便)
export function selectChan(def: undefined, ...cases: Array<CaseLike>): CaseLike | undefined;
export function selectChan(...cases: Array<CaseLike>): Promise<CaseLike> | CaseLike;
export function selectChan(): Promise<any>;
CaseLike 是一個包含了 chan 讀寫上下文信息的 class,它可以實現對 chan 的讀寫以及註冊讀寫任務還有就是撤銷讀寫,
selectChan 實現就很簡單了
select{}
功能這樣和 switch 配合使用就可以在 js 中重現 golang 的 select 了,僞代碼大概像這樣
switch(await selectChan(c0,c1,c2)){
case c0:
break
case c1:
break
case c2:
break
}
switch(await selectChan(undefined,c0,c1)){
case c0:
break
case c1:
break
case undefined: // 執行 default 邏輯
break
}
下面是 selectChan 的實現代碼
export function selectChan(...cases: Array<CaseLike | undefined>): Promise<CaseLike> | CaseLike | undefined {
if (cases.length == 0) {
// 沒有傳入 case 所以 select 永遠阻塞
return neverPromise
} else if (cases.length > 1) {
shuffle(cases) // 打亂數組
}
// 重置 case 狀態
for (const c of cases) {
c?.reset()
}
// 檢查就緒的 case
let def = false
for (const c of cases) {
if (c === undefined) {
def = true
} else {
// 讀寫 完成
if (c.tryInvoke()) {
return c
}
}
}
// 沒有就緒 case 但設置了 default,返回 default case
if (def) {
return
} else if (cases.length == 1) {
// 只有一個 case
const c = cases[0] as CaseLike
return c.invoke().then(() => {
return c
})
}
// 存在多個 case
return new Promise((resolve, reject) => {
const arrs = cases as Array<CaseLike>
const conns = new Array<Connection>(arrs.length)
for (let i = 0; i < arrs.length; i++) {
conns[i] = arrs[i].do((c) => {
for (let i = 0; i < conns.length; i++) {
conns[i].disconet()
}
resolve(c)
}, () => {
for (let i = 0; i < conns.length; i++) {
conns[i].disconet()
}
reject(errChannelClosed)
})
}
})
}
首先看下 CaseLike 這個接口,它定義了 Case 要提供的功能
export interface CaseLike {
// 重置完成狀態,將 isReady 設置爲 fasle
reset(): void
// 類似 Writer/Reader 的 try 函數,嘗試是否可以立刻完成當前的 chan 操作
tryInvoke(): boolean
// 當不可以立刻完成時,此函數去註冊一個任務並使用 js 的 Promise 等待任務完成,用於 cases 數組長度爲 1 時
invoke(): Promise<void>
// 當不可以立刻完成時,註冊任務使用回調通知結果,返回值 Connection 可用於撤銷註冊的 case 任務
do(resolve: (c: CaseLike) => void, reject: (c: CaseLike) => void): Connection
// 對於讀取 case 這個函數返回了讀取到的值
read(): IteratorResult<any>
// 對於寫入 case 這個返回寫入是否成功了
write(): boolean
// 這個屬性返回 是否就緒,即select 等待完成的case 是否是這個
// 如果此值爲 false 則 CaseLike 的 read/write 函數會拋出異常,這是爲了強制把每次的 select 和 case 都關聯起來避免,用戶錯誤的把上次的 select 結果使用到後續 select 中
readonly isReady: boolean
}
下面是 case 的具體實現
/**
*
* @sealed
*/
export class Case<T>{
// 這個在 ts 中被定義爲內部,用於 class Chan 創建 Case 實例
static make<T>(ch: Chan<T>, r: boolean, val?: any, exception?: boolean): Case<T> {
return new Case<T>(ch, r, val, exception)
}
// 把構造函數定義爲私有避免 用戶擅自創建錯誤的 Case 實例
private constructor(private readonly ch: Chan<T>, // 關聯的 chan
private readonly r: boolean, // 這是一個 讀取/寫入 cahn
private readonly val?: any, // 寫入 chan 要寫入的值
private readonly exception?: boolean, // 寫入 chan 在 chan 關閉時 是要 返回 boolean 還是 throw 異常
) {
}
toString(): string {
if (this.r) {
return JSON.stringify({
case: 'read',
ready: this.isReady,
val: this.read_,
}, undefined, "\t")
} else {
return JSON.stringify({
case: 'write',
ready: this.isReady,
val: this.write_,
}, undefined, "\t")
}
}
// 重置 isReady 實現爲 false, 由 selectChan 函數調用
reset() {
if (this.r) {
this.read_ = undefined
} else {
this.write_ = undefined
}
}
// 這裏直接調用 chan 提供的 tryRead/tryWrite 來判斷是否可以立刻讀寫
tryInvoke(): boolean {
if (this.r) {
return this._tryRead()
} else {
return this._tryWrite()
}
}
// 爲讀寫註冊任務
do(resolve: (c: CaseLike) => void, reject: (c: CaseLike) => void): Connection {
const rw = this.ch.rw
if (this.r) {
return rw.read((val) => {
this.read_ = val
resolve(this)
})
} else {
return rw.write((ok) => {
if (ok) {
this.write_ = true
} else {
this.write_ = false
if (this.exception) {
reject(this)
return
}
}
resolve(this)
}, undefined, this.val)
}
}
// 執行讀寫並等待完成
invoke(): Promise<void> {
const rw = this.ch.rw
if (this.r) {
return new Promise((resolve) => {
rw.read((val) => {
this.read_ = val
resolve()
})
})
} else {
return new Promise((resolve, reject) => {
rw.write((ok) => {
if (ok) {
this.write_ = true
} else {
this.write_ = false
if (this.exception) {
reject(errChannelClosed)
return
}
}
resolve()
}, undefined, this.val)
})
}
}
// 調用 chan 的寫入,同時修改好自己的 ready 狀態
private _tryWrite(): boolean {
const ch = this.ch
const val = ch.tryWrite(this.val, this.exception)
if (val) {
this.write_ = true
return true
} else if (ch.isClosed) {
this.write_ = false
return true
}
return false
}
// 調用 chan 的讀取,同時修改好自己的 ready 狀態
private _tryRead(): boolean {
const val = this.ch.tryRead()
if (val == undefined) {
return false
}
this.read_ = val
return true
}
// 記錄讀取 case 讀取到的值
private read_?: IteratorResult<T>
// 讀取 case 才能調用用於返回讀取到的 值
read(): IteratorResult<T> {
const val = this.read_
if (val === undefined) {
throw errChanneReadCase
}
return val
}
// 記錄寫入 case 寫入是否成功
private write_?: boolean
// 寫入 case 才能調用用於記錄是否寫入成功
write(): boolean {
const val = this.write_
if (val === undefined) {
throw errChanneWriteCase
}
return val
}
// 返回 case 是否 就緒
get isReady(): boolean {
return this.r ? this.read_ !== undefined : this.write_ !== undefined
}
}
現在我們只需要爲 class Chan 提供一個 readCase 和 writeCase 函數用於創建 Case 實例即可使用 select 來等待了
class Chan{
// 爲當前 chan 創建一個 讀取 Case
readCase(): Case<T> {
return Case.make(this, true)
}
// 爲當前 chan 創建一個 寫入 Case 用來寫入 val
writeCase(val: T, exception?: boolean): Case<T> {
return Case.make(this, false, val, exception)
}
}